package com.charlotte.gupao.study.comsumer;

import com.charlotte.gupao.study.support.AmqpConnection;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author junjie.ding
 * @date 2020/10/29 21:35
 * @description TODO
 */
public class FanoutConsumer1 {

    private static final String QUEUE_NAME = "FANOUT_QUEUE_NAME_1";

    private static final String EXCHANGE_NAME = "FANOUT_EXCHANGE_NAME";

    public static void main(String[] args) {
        Connection conn;
        final Channel channel;
        try {
            conn = AmqpConnection.getConnection();
            // 创建消息通道
            channel = conn.createChannel();

            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, null);
            // 声明队列（默认交换机AMQP default，Direct）
            // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#");

            // 创建消费者，并接收消息
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("接收消息 : '" + msg + "'");

                    if (msg.contains("拒收")){
                        // 拒绝消息
                        // requeue：是否重新入队列，true：是；false：直接丢弃，相当于告诉队列可以直接删除掉
                        // TODO 如果只有这一个消费者，requeue 为true 的时候会造成消息重复消费
                        channel.basicReject(envelope.getDeliveryTag(), false);
                    } else if (msg.contains("异常")){
                        // 批量拒绝
                        // requeue：是否重新入队列
                        // TODO 如果只有这一个消费者，requeue 为true 的时候会造成消息重复消费
                        channel.basicNack(envelope.getDeliveryTag(), true, false);
                    } else {
                        // 手工应答
                        // 如果不应答，队列中的消息会一直存在，重新连接的时候会重复消费
                        channel.basicAck(envelope.getDeliveryTag(), true);
                    }
                }
            };

            // 开始获取消息，注意这里开启了手工应答
            // String queue, boolean autoAck, Consumer callback
            channel.basicConsume(QUEUE_NAME, false, consumer);

//            channel.close();
//            conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {

        }
    }
}
